In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from collections import defaultdict
import pickle
from __future__ import division
In [2]:
#load parquet file
old_df = sqlContext.parquetFile('./master')
month = udf(lambda date_time: date_time.month, IntegerType())
df = old_df.withColumn("month", month(old_df.date))
In [8]:
crime_district_df = df.select(df.year, df.month,df.district,df.primarytype)
#Registers this RDD as a temporary table using the given name.
crime_district_df.registerTempTable("crime_district_df_table")
column_district = sqlContext.sql("select year, month, district, count(*) as count \
from crime_district_df_table group by year, month, district" )
column_primaryType = sqlContext.sql("select year, month, primarytype, count(*) as count \
from crime_district_df_table group by year, month, primarytype" )
In [9]:
#Returns all the records as a list of Row.
column1_dataframe = column_district.collect()
column2_dataframe = column_primaryType.collect()
In [10]:
#create two dictionary as the place holder of the data, this step is used to create proper data structure for starbase library
big_dict1 = {}
def myfunc(key1,key2,value):
if not key1 in big_dict1:
big_dict1[key1]=dict()
big_dict1[key1][key2]=value
elif not key2 in big_dict1[key1]:
big_dict1[key1][key2] = value
big_dict2 = {}
def myfunc2(key1,key2,value):
if not key1 in big_dict2:
big_dict2[key1]=dict()
big_dict2[key1][key2]=value
elif not key2 in big_dict2[key1]:
big_dict2[key1][key2] = value
In [12]:
for entry in column1_dataframe:
myfunc(str(entry.year)+'_'+str(entry.month), entry.district, entry.count)
for entry in column2_dataframe:
myfunc2(str(entry.year)+'_'+str(entry.month), entry.primarytype, entry.count)
#count total amount crimes in each month
big_dict1_total = dict()
for year_month,value in big_dict1.items():
subtotal = 0
for disctrict in value.keys():
subtotal = subtotal + value[disctrict]
big_dict1_total[year_month] = subtotal
#count all types of crimes in each month
big_dict2_total = dict()
for year_month,value in big_dict2.items():
subtotal = 0
for primarytype in value.keys():
subtotal = subtotal + value[primarytype]
big_dict2_total[year_month] = subtotal
In [14]:
#calculate district crime rate
for year_month, district_data in big_dict1.items():
for dis in district_data.keys():
big_dict1[year_month][dis] = big_dict1[year_month][dis]/big_dict1_total[year_month]
#calculate primary type crime rate
for year_month, primarytype_data in big_dict2.items():
for primarytype in primarytype_data.keys():
big_dict2[year_month][primarytype] = big_dict2[year_month][primarytype]/big_dict2_total[year_month]
In [ ]:
t=c.table('ChicagoCrimeRecords') #creat a Hbase table
t.create('District','Primarytype')#creat two column family
b=t.batch() #batch operation work similar to normal insert and update, but done in batch
if b:
for i in df_district.keys():
pre_data = df_district[i]
data = {'District': pre_data}
b.insert(i, data)
data ={}
b.commit(finalize=True)
b=t.batch()
if b:
for i in df_type.keys():
pre_data = df_type[i]
data = {'Primarytype': pre_data}
b.insert(i, data)
data ={}
b.commit(finalize=True)